diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index ea4c85e3959..c1f971c3fe8 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1199,7 +1199,7 @@ AS 'pg_logical_slot_peek_binary_changes'; CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot( IN slot_name name, IN immediately_reserve boolean DEFAULT false, - IN temporary boolean DEFAULT false, + IN temporary boolean DEFAULT false, IN restart_lsn pg_lsn DEFAULT '0/0'::pg_lsn, OUT slot_name name, OUT lsn pg_lsn) RETURNS RECORD LANGUAGE INTERNAL diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index f2fd8f336ed..bbe85974db2 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1064,6 +1064,58 @@ ReplicationSlotReserveWal(void) } } +/* + * Similar to ReplicationSlotReserveWal, but not for the current LSN, but for + * the LSN from the past. Creates a physical replication slot if WAL segment + * with specified restart_lsn exists. + */ +void +ReplicationSlotReserveHistoryWal(XLogRecPtr restart_lsn) +{ + XLogSegNo segno; + XLogRecPtr restartRedoPtr; + TimeLineID restartTli; + char xlogfname[MAXFNAMELEN]; + char *filename; + struct stat buf; + + Assert(MyReplicationSlot != NULL); + Assert(MyReplicationSlot->data.restart_lsn == InvalidXLogRecPtr); + + if (!RecoveryInProgress() && !SlotIsLogical(MyReplicationSlot)) + { + XLByteToSeg(restart_lsn, segno, wal_segment_size); + GetOldestRestartPoint(&restartRedoPtr, &restartTli); + XLogFileName(xlogfname, restartTli, segno, wal_segment_size); + filename = psprintf("%s/pg_wal/%s", DataDir, xlogfname); + if (stat(filename, &buf) != 0) + { + pfree(filename); + ReplicationSlotDropAcquired(); + elog(ERROR, "WAL segment %s with specified LSN %X/%X is absent", + xlogfname, (uint32)(restart_lsn >> 32), (uint32)restart_lsn); + } + else + { + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->data.restart_lsn = restart_lsn; + SpinLockRelease(&MyReplicationSlot->mutex); + } + + /* prevent WAL removal as fast as possible */ + ReplicationSlotsComputeRequiredLSN(); + + if (stat(filename, &buf) != 0) + { + pfree(filename); + ReplicationSlotDropAcquired(); + elog(ERROR, "WAL segment with specified LSN %X/%X is absent", + (uint32)(restart_lsn >> 32), (uint32)restart_lsn); + } + pfree(filename); + } +} + /* * Flush all replication slots to disk. * diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 3f5944f2ad5..2cae02c06c6 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -44,7 +44,8 @@ check_permissions(void) */ static void create_physical_replication_slot(char *name, bool immediately_reserve, - bool temporary, XLogRecPtr restart_lsn) + bool temporary, XLogRecPtr restart_lsn, + bool historic_lsn) { Assert(!MyReplicationSlot); @@ -55,7 +56,12 @@ create_physical_replication_slot(char *name, bool immediately_reserve, if (immediately_reserve) { /* Reserve WAL as the user asked for it */ - if (XLogRecPtrIsInvalid(restart_lsn)) + if (historic_lsn) + { + Assert(!XLogRecPtrIsInvalid(restart_lsn)); + ReplicationSlotReserveHistoryWal(restart_lsn); + } + else if (XLogRecPtrIsInvalid(restart_lsn)) ReplicationSlotReserveWal(); else MyReplicationSlot->data.restart_lsn = restart_lsn; @@ -76,12 +82,16 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) Name name = PG_GETARG_NAME(0); bool immediately_reserve = PG_GETARG_BOOL(1); bool temporary = PG_GETARG_BOOL(2); + XLogRecPtr restart_lsn = PG_GETARG_LSN(3); Datum values[2]; bool nulls[2]; TupleDesc tupdesc; HeapTuple tuple; Datum result; + if (restart_lsn != InvalidXLogRecPtr && !immediately_reserve) + elog(ERROR, "immediately_reserve should not be false when setting restart_lsn"); + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) elog(ERROR, "return type must be a row type"); @@ -92,7 +102,8 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) create_physical_replication_slot(NameStr(*name), immediately_reserve, temporary, - InvalidXLogRecPtr); + restart_lsn, + restart_lsn != InvalidXLogRecPtr); values[0] = NameGetDatum(&MyReplicationSlot->data.name); nulls[0] = false; @@ -699,7 +710,8 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) create_physical_replication_slot(NameStr(*dst_name), true, temporary, - src_restart_lsn); + src_restart_lsn, + false); /* * Update the destination slot to current values of the source slot; diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 35f669b60d6..a865df29081 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -9772,10 +9772,10 @@ # replication slots { oid => '3779', descr => 'create a physical replication slot', proname => 'pg_create_physical_replication_slot', provolatile => 'v', - proparallel => 'u', prorettype => 'record', proargtypes => 'name bool bool', - proallargtypes => '{name,bool,bool,name,pg_lsn}', - proargmodes => '{i,i,i,o,o}', - proargnames => '{slot_name,immediately_reserve,temporary,slot_name,lsn}', + proparallel => 'u', prorettype => 'record', proargtypes => 'name bool bool pg_lsn', + proallargtypes => '{name,bool,bool,pg_lsn,name,pg_lsn}', + proargmodes => '{i,i,i,i,o,o}', + proargnames => '{slot_name,immediately_reserve,temporary,restart_lsn,slot_name,lsn}', prosrc => 'pg_create_physical_replication_slot' }, { oid => '4220', descr => 'copy a physical replication slot, changing temporality', diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 8fbddea78fd..2f581860dc7 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -194,6 +194,7 @@ extern void ReplicationSlotMarkDirty(void); /* misc stuff */ extern bool ReplicationSlotValidateName(const char *name, int elevel); extern void ReplicationSlotReserveWal(void); +extern void ReplicationSlotReserveHistoryWal(XLogRecPtr restart_lsn); extern void ReplicationSlotsComputeRequiredXmin(bool already_locked); extern void ReplicationSlotsComputeRequiredLSN(void); extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); diff --git a/src/test/recovery/t/019_historic_lsn.pl b/src/test/recovery/t/019_historic_lsn.pl new file mode 100644 index 00000000000..a4907cc3151 --- /dev/null +++ b/src/test/recovery/t/019_historic_lsn.pl @@ -0,0 +1,51 @@ +# test for check historic lsn. +# function "ReplicationSlotReserveHistoryWal". +# 019_historic_lsn.pl +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; + +# Initialize master node +my $node_master = get_new_node('master'); +$node_master->init( + has_archiving => 1, + allows_streaming => 1); + +$node_master->append_conf( + 'postgresql.conf', qq{ + max_wal_size = 32MB + min_wal_size = 32MB +}); +$node_master->start; + +my $lsn = $node_master->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn();"); +my $i = 4; +while ($i-- != 0) +{ + $node_master->safe_psql('postgres', "SELECT pg_switch_wal();"); + $node_master->safe_psql('postgres', 'CHECKPOINT;'); +} +$node_master->safe_psql('postgres', 'CHECKPOINT;'); +my $check = $node_master->psql('postgres', "SELECT pg_create_physical_replication_slot('qwe', true, false, '$lsn'::pg_lsn);"); +is($check, '3', 'this physical slot should not be created'); + +$node_master->safe_psql('postgres', 'CHECKPOINT;'); +my $directory = $node_master->data_dir; +opendir(my $dh, "$directory/pg_wal"); +my @filelist = readdir($dh); + +my $lsn1 = $node_master->safe_psql('postgres', "SELECT pg_switch_wal();"); +$node_master->psql('postgres', "SELECT pg_create_physical_replication_slot('qwe', true, false, '$lsn1'::pg_lsn);"); +$i = 5; +while ($i-- != 0) +{ + $node_master->safe_psql('postgres', "SELECT pg_switch_wal();"); + $node_master->safe_psql('postgres', 'CHECKPOINT;'); +} +rewinddir($dh); +my @filelist1 = readdir($dh); +closedir($dh); +my $res = @filelist + 5; +is(@filelist1, $res, 'this physical slot must be created');